package com.yangm.consumer;

import com.yangm.core.MiniRpcRequest;
import com.yangm.core.RpcServiceHelper;
import com.yangm.core.ServiceMeta;
import com.yangm.coder.MiniRpcDecoder;
import com.yangm.coder.MiniRpcEncoder;
import com.yangm.handler.RpcResponseHandler;
import com.yangm.protocol.MiniRpcProtocol;
import com.yangm.registry.RegistryService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RpcConsumer {
    private final Bootstrap bootstrap;

    private final EventLoopGroup eventLoopGroup;

    public RpcConsumer() {
        bootstrap = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup(4);
        //构建请求客户端
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new MiniRpcEncoder())
                                .addLast(new MiniRpcDecoder())
                                .addLast(new RpcResponseHandler());
                    }
                });
    }

    public void sendRequest(MiniRpcProtocol<MiniRpcRequest> protocol, RegistryService registryService) throws Exception {
        MiniRpcRequest request = protocol.getBody();
        Object[] params = request.getParams();
        String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());

        //根据第一个参数来设置 invoker hashcode
        int invokerHashCode = params.length > 0 ? params[0].hashCode() : serviceKey.hashCode();
        ServiceMeta serviceMeta = registryService.discovery(serviceKey, invokerHashCode);
        if (null != serviceMeta) {

            log.info("connecting rpc server {} on port {}....", serviceMeta.getServiceAddress(), serviceMeta.getServicePort());
            ChannelFuture future = bootstrap.connect(serviceMeta.getServiceAddress(), serviceMeta.getServicePort()).sync();
            future.addListener((ChannelFutureListener) cfl -> {
                if (future.isSuccess()) {
                    //连接成功
                    log.info("connect rpc server {} on port {} success.", serviceMeta.getServiceAddress(), serviceMeta.getServicePort());
                } else {
                    log.info("connect rpc server {} on port {} failed.", serviceMeta.getServiceAddress(), serviceMeta.getServicePort());
                    future.cause().printStackTrace();
                    eventLoopGroup.shutdownGracefully();
                }
            });
            future.channel().writeAndFlush(protocol);
            log.info("below  wrote...{}",future.channel());

        }

    }
}
